package com.atguigu.gmall.cdc;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author: Felix
 * Date: 2022/4/2
 * Desc:  使用FlinkCDC读取MySQL表中数据---API方式实现
 */
public class FlinkCDC01_api {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(1);
        //TODO 2. 如果开启检查点  FlinkCDC读取到binlog的偏移量会记录到状态中，我们从检查点或者保存点恢复数据
       /* //2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(30),Time.seconds(3)));
        env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/cdc/ck"));
        System.setProperty("HADOOP_USER_NAME","atguigu");*/

        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .hostname("hadoop202")
            .port(3306)
            .databaseList("gmall0925_realtime")
            .tableList("gmall0925_realtime.t_user")
            .username("root")
            .password("123456")
            .startupOptions(StartupOptions.initial())
            .deserializer(new StringDebeziumDeserializationSchema())
            .build();
        DataStreamSource<String> mySQLDS = env.addSource(sourceFunction);
        mySQLDS.print(">>>>");
        env.execute();
    }
}
